Examples

1 DQN with PrioritizedReplayBuffer

The following is a DQN1 example. This also includes Double DQN2 and Prioritized Experience Replay3.

import os
import datetime

import numpy as np

import gym

import tensorflow as tf
from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.summary import create_file_writer


from cpprb import ReplayBuffer, PrioritizedReplayBuffer


gamma = 0.99
batch_size = 1024

N_iteration = int(1e+5)
target_update_freq = 1000
eval_freq = 100

egreedy = 0.1



# Log
dir_name = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = os.path.join("logs", dir_name)
writer = create_file_writer(logdir + "/metrics")
writer.set_as_default()


# Env
env = gym.make('CartPole-v1')
eval_env = gym.make('CartPole-v1')

# For CartPole: input 4, output 2
model = Sequential([Dense(64,activation='relu',
                          input_shape=(env.observation_space.shape)),
                    Dense(64,activation='relu'),
                    Dense(env.action_space.n)])
target_model = clone_model(model)


# Loss Function

@tf.function
def Huber_loss(absTD):
    return tf.where(absTD > 1.0, absTD, tf.math.square(absTD))

@tf.function
def MSE(absTD):
    return tf.math.square(absTD)

loss_func = Huber_loss


optimizer = Adam()


buffer_size = 1e+6
env_dict = {"obs":{"shape": env.observation_space.shape},
            "act":{"shape": 1,"dtype": np.ubyte},
            "rew": {},
            "next_obs": {"shape": env.observation_space.shape},
            "done": {}}

# Nstep
nstep = 3
# nstep = False

if nstep:
    Nstep = {"size": nstep, "rew": "rew", "next": "next_obs"}
    discount = tf.constant(gamma ** nstep)
else:
    Nstep = None
    discount = tf.constant(gamma)


# Prioritized Experience Replay: https://arxiv.org/abs/1511.05952
# See https://ymd_h.gitlab.io/cpprb/features/per/
prioritized = True

if prioritized:
    rb = PrioritizedReplayBuffer(buffer_size,env_dict,Nstep=Nstep)

    # Beta linear annealing
    beta = 0.4
    beta_step = (1 - beta)/N_iteration
else:
    rb = ReplayBuffer(buffer_size,env_dict,Nstep=Nstep)


@tf.function
def Q_func(model,obs,act,act_shape):
    return tf.reduce_sum(model(obs) * tf.one_hot(act,depth=act_shape), axis=1)

@tf.function
def DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
    return gamma*tf.reduce_max(target(next_obs),axis=1)*(1.0-done) + rew

@tf.function
def Double_DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
    """
    Double DQN: https://arxiv.org/abs/1509.06461
    """
    act = tf.math.argmax(model(next_obs),axis=1)
    return gamma*tf.reduce_sum(target(next_obs)*tf.one_hot(act,depth=act_shape), axis=1)*(1.0-done) + rew


target_func = Double_DQN_target_func



def evaluate(model,env):
    obs = env.reset()
    total_rew = 0

    while True:
        Q = tf.squeeze(model(obs.reshape(1,-1)))
        act = np.argmax(Q)
        obs, rew, done, _ = env.step(act)
        total_rew += rew

        if done:
            return total_rew

# Start Experiment

observation = env.reset()

# Warming up
for n_step in range(100):
    action = env.action_space.sample() # Random Action
    next_observation, reward, done, info = env.step(action)
    rb.add(obs=observation,
           act=action,
           rew=reward,
           next_obs=next_observation,
           done=done)
    observation = next_observation
    if done:
        env.reset()
        rb.on_episode_end()


n_episode = 0
observation = env.reset()
for n_step in range(N_iteration):

    if np.random.rand() < egreedy:
        action = env.action_space.sample()
    else:
        Q = tf.squeeze(model(observation.reshape(1,-1)))
        action = np.argmax(Q)

    next_observation, reward, done, info = env.step(action)
    rb.add(obs=observation,
           act=action,
           rew=reward,
           next_obs=next_observation,
           done=done)
    observation = next_observation

    if prioritized:
        sample = rb.sample(batch_size,beta)
        beta += beta_step
    else:
        sample = rb.sample(batch_size)

    weights = sample["weights"].ravel() if prioritized else tf.constant(1.0)

    with tf.GradientTape() as tape:
        tape.watch(model.trainable_weights)
        Q =  Q_func(model,
                    tf.constant(sample["obs"]),
                    tf.constant(sample["act"].ravel()),
                    tf.constant(env.action_space.n))
        target_Q = target_func(model,target_model,
                               tf.constant(sample['next_obs']),
                               tf.constant(sample["rew"].ravel()),
                               tf.constant(sample["done"].ravel()),
                               discount,
                               tf.constant(env.action_space.n))
        absTD = tf.math.abs(target_Q - Q)
        loss = tf.reduce_mean(loss_func(absTD)*weights)

    grad = tape.gradient(loss,model.trainable_weights)
    optimizer.apply_gradients(zip(grad,model.trainable_weights))
    tf.summary.scalar("Loss vs training step", data=loss, step=n_step)


    if prioritized:
        Q =  Q_func(model,
                    tf.constant(sample["obs"]),
                    tf.constant(sample["act"].ravel()),
                    tf.constant(env.action_space.n))
        absTD = tf.math.abs(target_Q - Q)
        rb.update_priorities(sample["indexes"],absTD)

    if done:
        env.reset()
        rb.on_episode_end()
        n_episode += 1

    if n_step % target_update_freq == 0:
        target_model.set_weights(model.get_weights())

    if n_step % eval_freq == eval_freq-1:
        eval_rew = evaluate(model,eval_env)
        tf.summary.scalar("episode reward vs training step",data=eval_rew,step=n_step)

2 Loss Adjusted Prioritization

The following is a LAP4 example. See the theory page.

import os
import datetime

import numpy as np

import gym

import tensorflow as tf
from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.summary import create_file_writer


from cpprb import ReplayBuffer, PrioritizedReplayBuffer


gamma = 0.99
batch_size = 1024

N_iteration = int(1e+5)
target_update_freq = 1000
eval_freq = 100

egreedy = 0.1



# Log
dir_name = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = os.path.join("logs", dir_name)
writer = create_file_writer(logdir + "/metrics")
writer.set_as_default()


# Env
env = gym.make('CartPole-v1')
eval_env = gym.make('CartPole-v1')

# For CartPole: input 4, output 2
model = Sequential([Dense(64,activation='relu',
                          input_shape=(env.observation_space.shape)),
                    Dense(64,activation='relu'),
                    Dense(env.action_space.n)])
target_model = clone_model(model)


# Loss Function

@tf.function
def Huber_loss(absTD):
    return tf.where(absTD > 1.0, absTD, tf.math.square(absTD))


loss_func = Huber_loss


optimizer = Adam()


buffer_size = 1e+6
env_dict = {"obs":{"shape": env.observation_space.shape},
            "act":{"shape": 1,"dtype": np.ubyte},
            "rew": {},
            "next_obs": {"shape": env.observation_space.shape},
            "done": {}}

# Nstep
nstep = 3
# nstep = False

if nstep:
    Nstep = {"size": nstep, "rew": "rew", "next": "next_obs"}
    discount = tf.constant(gamma ** nstep)
else:
    Nstep = None
    discount = tf.constant(gamma)


rb = PrioritizedReplayBuffer(buffer_size,env_dict,Nstep=Nstep,eps=0)



@tf.function
def Q_func(model,obs,act,act_shape):
    return tf.reduce_sum(model(obs) * tf.one_hot(act,depth=act_shape), axis=1)

@tf.function
def DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
    return gamma*tf.reduce_max(target(next_obs),axis=1)*(1.0-done) + rew

@tf.function
def Double_DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
    """
    Double DQN: https://arxiv.org/abs/1509.06461
    """
    act = tf.math.argmax(model(next_obs),axis=1)
    return gamma*tf.reduce_sum(target(next_obs)*tf.one_hot(act,depth=act_shape), axis=1)*(1.0-done) + rew


target_func = Double_DQN_target_func



def evaluate(model,env):
    obs = env.reset()
    total_rew = 0

    while True:
        Q = tf.squeeze(model(obs.reshape(1,-1)))
        act = np.argmax(Q)
        obs, rew, done, _ = env.step(act)
        total_rew += rew

        if done:
            return total_rew

# Start Experiment

observation = env.reset()

# Warming up
for n_step in range(100):
    action = env.action_space.sample() # Random Action
    next_observation, reward, done, info = env.step(action)
    rb.add(obs=observation,
           act=action,
           rew=reward,
           next_obs=next_observation,
           done=done)
    observation = next_observation
    if done:
        env.reset()
        rb.on_episode_end()


n_episode = 0
observation = env.reset()
for n_step in range(N_iteration):

    if np.random.rand() < egreedy:
        action = env.action_space.sample()
    else:
        Q = tf.squeeze(model(observation.reshape(1,-1)))
        action = np.argmax(Q)

    next_observation, reward, done, info = env.step(action)
    rb.add(obs=observation,
           act=action,
           rew=reward,
           next_obs=next_observation,
           done=done)
    observation = next_observation

    sample = rb.sample(batch_size,beta=0.0)

    with tf.GradientTape() as tape:
        tape.watch(model.trainable_weights)
        Q =  Q_func(model,
                    tf.constant(sample["obs"]),
                    tf.constant(sample["act"].ravel()),
                    tf.constant(env.action_space.n))
        target_Q = target_func(model,target_model,
                               tf.constant(sample['next_obs']),
                               tf.constant(sample["rew"].ravel()),
                               tf.constant(sample["done"].ravel()),
                               discount,
                               tf.constant(env.action_space.n))
        absTD = tf.math.abs(target_Q - Q)
        loss = tf.reduce_mean(loss_func(absTD))

    grad = tape.gradient(loss,model.trainable_weights)
    optimizer.apply_gradients(zip(grad,model.trainable_weights))
    tf.summary.scalar("Loss vs training step", data=loss, step=n_step)


    Q =  Q_func(model,
                tf.constant(sample["obs"]),
                tf.constant(sample["act"].ravel()),
                tf.constant(env.action_space.n))
    absTD = tf.math.abs(target_Q - Q)
    rb.update_priorities(sample["indexes"],tf.math.maximum(absTD,tf.constant(1.0)))

    if done:
        env.reset()
        rb.on_episode_end()
        n_episode += 1

    if n_step % target_update_freq == 0:
        target_model.set_weights(model.get_weights())

    if n_step % eval_freq == eval_freq-1:
        eval_rew = evaluate(model,eval_env)
        tf.summary.scalar("episode reward vs training step",data=eval_rew,step=n_step)

3 Multiprocess Learning (Ape-X) with MPPrioritizedReplayBuffer

This is a Ape-X5 example code. See Multiprocess Learning (Ape-X) for detail.

from multiprocessing import Process, Event, SimpleQueue
import time

import gym
import numpy as np
from tqdm import tqdm

from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer


class MyModel:
    def __init__(self):
        self._weights = 0

    def get_action(self,obs):
        # Implement action selection
        return 0

    def abs_TD_error(self,sample):
        # Implement absolute TD error
        return np.zeros(sample["obs"].shape[0])

    @property
    def weights(self):
        return self._weights

    @weights.setter
    def weights(self,w):
        self._weights = w

    def train(self,sample):
        # Implement model update
        pass


def explorer(global_rb,env_dict,is_training_done,queue):
    local_buffer_size = int(1e+2)
    local_rb = ReplayBuffer(local_buffer_size,env_dict)

    model = MyModel()
    env = gym.make("CartPole-v1")

    obs = env.reset()
    while not is_training_done.is_set():
        if not queue.empty():
            w = queue.get()
            model.weights = w

        action = model.get_action(obs)
        next_obs, reward, done, _ = env.step(action)
        local_rb.add(obs=obs,act=action,rew=reward,next_obs=next_obs,done=done)

        if done:
            local_rb.on_episode_end()
            obs = env.reset()
        else:
            obs = next_obs

        if local_rb.get_stored_size() == local_buffer_size:
            local_sample = local_rb.get_all_transitions()
            local_rb.clear()

            absTD = model.abs_TD_error(local_sample)
            global_rb.add(**local_sample,priorities=absTD)


def learner(global_rb,queues):
    batch_size = 64
    n_warmup = 100
    n_training_step = int(1e+4)
    explorer_update_freq = 100

    model = MyModel()

    while global_rb.get_stored_size() < n_warmup:
        time.sleep(1)

    for step in tqdm(range(n_training_step)):
        sample = global_rb.sample(batch_size)

        model.train(sample)
        absTD = model.abs_TD_error(sample)
        global_rb.update_priorities(sample["indexes"],absTD)

        if step % explorer_update_freq == 0:
            w = model.weights
            for q in queues:
                q.put(w)


if __name__ == "__main__":
    buffer_size = int(1e+6)
    env_dict = {"obs": {"shape": 4},
                "act": {},
                "rew": {},
                "next_obs": {"shape": 4},
                "done": {}}
    n_explorer = 4

    global_rb = MPPrioritizedReplayBuffer(buffer_size,env_dict)

    is_training_done = Event()
    is_training_done.clear()

    qs = [SimpleQueue() for _ in range(n_explorer)]
    ps = [Process(target=explorer,
                  args=[global_rb,env_dict,is_training_done,q])
          for q in qs]

    for p in ps:
        p.start()

    learner(global_rb,qs)
    is_training_done.set()

    for p in ps:
        p.join()

    print(global_rb.get_stored_size())

4 Create ReplayBuffer for non-simple gym.Env with helper functions

# create_buffer_with_helper_func.py
#
# Create `ReplayBuffer` for non simple space `gym.Env` with helper functions.


import gym
from cpprb import ReplayBuffer, create_env_dict, create_before_add_func

env = gym.make("Blackjack-v0")
# https://github.com/openai/gym/blob/master/gym/envs/toy_text/blackjack.py
# BlackjackEnv
#   observation_space: Tuple(Discrete(32),Discrete(11),Discrete(2))
#   action_space     : Discrete(2)


env_dict = create_env_dict(env)
# >>> env_dict
#{'act': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'done': {'add_shape': array([-1,  1]), 'dtype': numpy.float32, 'shape': 1},
# 'next_obs0': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'next_obs1': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'next_obs2': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'obs0': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'obs1': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'obs2': {'add_shape': array([-1,  1]), 'dtype': numpy.int32, 'shape': 1},
# 'rew': {'add_shape': array([-1,  1]), 'dtype': numpy.float32, 'shape': 1}}



rb = ReplayBuffer(256, env_dict)


obs = env.reset()
before_add = create_before_add_func(env)

for i in range(400):
    act = env.action_space.sample()
    next_obs, rew, done, _ = env.step(act)

    rb.add(**before_add(obs=obs,act=act,next_obs=next_obs,rew=rew,done=done))
    # Create `dict` for `ReplayBuffer.add`

    if done:
        obs = env.reset()
    else:
        obs = next_obs

  1. V. Mnhi et al., “Human-level control through deep reinforcement learning”, Nature 518, 529-533 (2015) ↩︎

  2. H. Hasselt et al., “Deep Reinforcement Learning with Double Q-Learning”, Proc. AAAI 30, 1 (2016) (arXiv cs.LG 1509.06461) ↩︎

  3. T. Schaul et al., “Prioritized Experience Replay”, ICLR (2016) ↩︎

  4. S. Fujimoto et al., “An Equivalence between Loss Functions and Non-Uniform Sampling in Experience Replay” (2020) arXiv:2007.06049 ↩︎

  5. D. Hogan et al., “Distributed Prioritized Experience Replay”, ICLR (2018) (arXiv cs.LG 1803.00933) ↩︎